Skip to content

Perf: Window topn optimisation#21479

Open
SubhamSinghal wants to merge 5 commits intoapache:mainfrom
SubhamSinghal:window-topn-partitioned-topk-exec
Open

Perf: Window topn optimisation#21479
SubhamSinghal wants to merge 5 commits intoapache:mainfrom
SubhamSinghal:window-topn-partitioned-topk-exec

Conversation

@SubhamSinghal
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

Queries like SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM t WHERE rn <= K are extremely common in analytics ("top N per group"). The current plan sorts the entire dataset O(N log N), computes ROW_NUMBER for all rows, then filters. With 10M rows, 1K partitions, and K=3, we sort all 10M rows but only keep 3K.

This PR introduces a PartitionedTopKExec operator that replaces the SortExec, maintaining a per-partition TopK heap (reusing DataFusion's existing TopK implementation). Cost drops to O(N log K) time and O(K × P × row_size) memory.

What changes are included in this PR?

New physical operator: PartitionedTopKExec (physical-plan/src/sorts/partitioned_topk.rs)

  • Reads unsorted input, groups rows by partition key using RowConverter, feeds sub-batches to a per-partition TopK heap
  • Emits only the top-K rows per partition in sorted (partition_keys, order_keys) order
  • Reuses the existing TopK implementation for heap management, sort key comparison, eviction, and batch compaction

New optimizer rule: WindowTopN (physical-optimizer/src/window_topn.rs)

Detects the pattern:

FilterExec(rn <= K)
  [optional ProjectionExec]
    BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
      SortExec(partition_keys, order_keys)

And replaces it with:

[optional ProjectionExec]
  BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
    PartitionedTopKExec(fetch=K)

Both FilterExec and SortExec are removed.

Supported predicates: rn <= K, rn < K, K >= rn, K > rn.

The rule only fires for ROW_NUMBER with a PARTITION BY clause. Global top-K (no PARTITION BY) is already handled by
SortExec with fetch.

Config flag: datafusion.optimizer.enable_window_topn (default: true)

Benchmark results (H2O groupby Q8, 10M rows, top-2 per partition):

cargo run --release --example h2o_window_topn_bench

Scenario Enabled (ms) Disabled (ms) Speedup
100 partitions (100K rows/part) 43 174 4.0x
1K partitions (10K rows/part) 71 146 2.1x
10K partitions (1K rows/part) 619 128 0.2x (regression)
100K partitions (100 rows/part) 4368 135 0.03x (regression)

The 100K-partition regression is expected: per-partition TopK overhead (RowConverter, MemoryReservation per instance)
dominates when partitions are very numerous with few rows each. For the common case (moderate partition cardinality), the
optimization provides 2-3x speedup.

Are these changes tested?

Yes:

  • 7 unit tests (core/tests/physical_optimizer/window_topn.rs): basic ROW_NUMBER, rn < K, flipped predicates, non-window column filter, config disabled, no partition by, projection between filter and window
  • 5 SLT tests (sqllogictest/test_files/window_topn.slt): correctness verification, EXPLAIN plan validation, rn < K, no-partition-by case, config disabled fallback

Are there any user-facing changes?

No breaking API changes. The optimization is enabled by default and transparent to users. It can be disabled via:

SET datafusion.optimizer.enable_window_topn = false;

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) common Related to common crate physical-plan Changes to the physical-plan crate labels Apr 8, 2026
Copy link
Copy Markdown
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you — this PR looks really nice.

I took a quick look and left a few suggestions. I’ll review the optimizer rewrite and execution side more carefully later.

// specific language governing permissions and limitations
// under the License.

// Standalone H2O groupby Q8 benchmark: PartitionedTopKExec enabled vs disabled
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could keep this benchmark in this PR, but it would be great to clean it up later.
To make benchmark maintenance easier, we could directly add queries representing this workload to h2o window benchmark, so that similar benchmarks won't get scattered to multiple places.

h2o_small_window: Extended h2oai benchmark with small dataset (1e7 rows) for window, default file format is csv

Though the issue is now the h2o benchmark counts the dataset loading time, so we can't isolate the target executor's processing time, so we could add an option to eliminate the data loading time later 🤔

// Step 1: Match FilterExec at the top
let filter = plan.downcast_ref::<FilterExec>()?;

// Don't handle filters with projections
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why skipping this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filter's column indices would point to the projected schema, not the window exec's output schema, so our index-based matching for the ROW_NUMBER column would be wrong without resolving the projection mapping. Skipping this case for simplicity right now.

)?))
}

fn apply_expressions(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but I’m curious why this is a required ExecutionPlan API and when it is used, given that different operators can hold expressions for very different purposes 🤔

# Tests for Window TopN optimization: PartitionedTopKExec

statement ok
CREATE TABLE window_topn_t (id INT, pk INT, val INT) AS VALUES
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving the main test coverage here, instead of keeping it in unit tests across different layers such as optimizer tests. Once we have solid coverage here, it is less likely to get lost during local refactors.

We can also extend the coverage with more edge cases, for example:

  • predicates such as rn < 2, 2 > rn, etc.
  • mixing other window expressions with row_number()
  • empty or overlapping partition / order keys, such as ... OVER (ORDER BY id) or ... OVER (PARTITION BY id ORDER BY id, customer)
  • different sort options such as ASC, DESC, and NULLS FIRST
  • the QUALIFY clause https://datafusion.apache.org/user-guide/sql/select.html#qualify-clause
  • and more

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests for these cases

@2010YOUY01 2010YOUY01 changed the title Benchmark: Window topn optimisation Perf: Window topn optimisation Apr 9, 2026
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Apr 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants